/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.hdfs;



import com.bff.gaia.unified.sdk.io.FileSystem;

import com.bff.gaia.unified.sdk.io.fs.CreateOptions;

import com.bff.gaia.unified.sdk.io.fs.MatchResult;

import com.bff.gaia.unified.sdk.io.fs.MatchResult.Metadata;

import com.bff.gaia.unified.sdk.io.fs.MatchResult.Status;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.*;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import java.io.FileNotFoundException;

import java.io.IOException;

import java.net.URI;

import java.nio.ByteBuffer;

import java.nio.channels.Channels;

import java.nio.channels.ReadableByteChannel;

import java.nio.channels.SeekableByteChannel;

import java.nio.channels.WritableByteChannel;

import java.nio.file.FileAlreadyExistsException;

import java.util.*;



/**

 * Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as Apache Unified {@link

 * FileSystem FileSystems}.

 *

 * <p>The following HDFS FileSystem(s) are known to be unsupported:

 *

 * <ul>

 *   <li>FTPFileSystem: Missing seek support within FTPInputStream

 * </ul>

 *

 * <p>This implementation assumes that the underlying Hadoop {@link FileSystem} is seek efficient

 * when reading. The source code for the following {@link FSInputStream} implementations (as of

 * Hadoop 2.7.1) do provide seek implementations:

 *

 * <ul>

 *   <li>HarFsInputStream

 *   <li>S3InputStream

 *   <li>DFSInputStream

 *   <li>SwiftNativeInputStream

 *   <li>NativeS3FsInputStream

 *   <li>LocalFSFileInputStream

 *   <li>NativeAzureFsInputStream

 *   <li>S3AInputStream

 * </ul>

 */

class HadoopFileSystem extends FileSystem<HadoopResourceId> {

  private static final Logger LOG = LoggerFactory.getLogger(HadoopFileSystem.class);



  @VisibleForTesting static final String LOG_CREATE_DIRECTORY = "Creating directory %s";

  @VisibleForTesting static final String LOG_DELETING_EXISTING_FILE = "Deleting existing file %s";

  @VisibleForTesting final org.apache.hadoop.fs.FileSystem fileSystem;



  HadoopFileSystem(Configuration configuration) throws IOException {

    this.fileSystem = org.apache.hadoop.fs.FileSystem.newInstance(configuration);

  }



  @Override

  protected List<MatchResult> match(List<String> specs) {

    ImmutableList.Builder<MatchResult> resultsBuilder = ImmutableList.builder();

    for (String spec : specs) {

      try {

        Set<Metadata> metadata = new HashSet<>();



        FileStatus[] fileStatuses = fileSystem.globStatus(new Path(spec));

        if (fileStatuses != null) {

          for (FileStatus fileStatus : fileStatuses) {

            if (fileStatus.isFile()) {

              metadata.add(toMetadata(fileStatus));

            }

          }

        }



        if (spec.contains("**")) {

          int index = spec.indexOf("**");

          metadata.addAll(

              matchRecursiveGlob(spec.substring(0, index + 1), spec.substring(index + 1)));

        }



        if (metadata.isEmpty()) {

          resultsBuilder.add(MatchResult.create(Status.NOT_FOUND, Collections.emptyList()));

        } else {

          resultsBuilder.add(MatchResult.create(Status.OK, new ArrayList<>(metadata)));

        }

      } catch (IOException e) {

        resultsBuilder.add(MatchResult.create(Status.ERROR, e));

      }

    }

    return resultsBuilder.build();

  }



  private Set<Metadata> matchRecursiveGlob(String directorySpec, String fileSpec)

      throws IOException {

    Set<Metadata> metadata = new HashSet<>();

    if (directorySpec.contains("*")) {

      // An abstract directory with a wildcard is converted to concrete directories to search.

      FileStatus[] directoryStatuses = fileSystem.globStatus(new Path(directorySpec));

      for (FileStatus directoryStatus : directoryStatuses) {

        if (directoryStatus.isDirectory()) {

          metadata.addAll(

              matchRecursiveGlob(directoryStatus.getPath().toUri().toString(), fileSpec));

        }

      }

    } else {

      // A concrete directory is searched.

      FileStatus[] fileStatuses = fileSystem.globStatus(new Path(directorySpec + "/" + fileSpec));

      for (FileStatus fileStatus : fileStatuses) {

        if (fileStatus.isFile()) {

          metadata.add(toMetadata(fileStatus));

        }

      }



      // All sub-directories of a concrete directory are searched.

      FileStatus[] directoryStatuses = fileSystem.globStatus(new Path(directorySpec + "/*"));

      for (FileStatus directoryStatus : directoryStatuses) {

        if (directoryStatus.isDirectory()) {

          metadata.addAll(

              matchRecursiveGlob(directoryStatus.getPath().toUri().toString(), fileSpec));

        }

      }



      // Handle additional instances of recursive globs.

      if (fileSpec.contains("**")) {

        int index = fileSpec.indexOf("**");

        metadata.addAll(

            matchRecursiveGlob(

                directorySpec + "/" + fileSpec.substring(0, index + 1),

                fileSpec.substring(index + 1)));

      }

    }

    return metadata;

  }



  private Metadata toMetadata(FileStatus fileStatus) {

    URI uri = dropEmptyAuthority(fileStatus.getPath().toUri().toString());

    return Metadata.builder()

        .setResourceId(new HadoopResourceId(uri))

        .setIsReadSeekEfficient(true)

        .setSizeBytes(fileStatus.getLen())

        .setLastModifiedMillis(fileStatus.getModificationTime())

        .build();

  }



  @Override

  protected WritableByteChannel create(HadoopResourceId resourceId, CreateOptions createOptions)

      throws IOException {

    return Channels.newChannel(fileSystem.create(resourceId.toPath()));

  }



  @Override

  protected ReadableByteChannel open(HadoopResourceId resourceId) throws IOException {

    FileStatus fileStatus = fileSystem.getFileStatus(resourceId.toPath());

    return new HadoopSeekableByteChannel(fileStatus, fileSystem.open(resourceId.toPath()));

  }



  @Override

  protected void copy(List<HadoopResourceId> srcResourceIds, List<HadoopResourceId> destResourceIds)

      throws IOException {

    for (int i = 0; i < srcResourceIds.size(); ++i) {

      // Unfortunately HDFS FileSystems don't support a native copy operation so we are forced

      // to use the inefficient implementation found in FileUtil which copies all the bytes through

      // the local machine.

      //

      // HDFS FileSystem does define a concat method but could only find the DFSFileSystem

      // implementing it. The DFSFileSystem implemented concat by deleting the srcs after which

      // is not what we want. Also, all the other FileSystem implementations I saw threw

      // UnsupportedOperationException within concat.

      boolean success =

          FileUtil.copy(

              fileSystem,

              srcResourceIds.get(i).toPath(),

              fileSystem,

              destResourceIds.get(i).toPath(),

              false,

              true,

              fileSystem.getConf());

      if (!success) {

        // Defensive coding as this should not happen in practice

        throw new IOException(

            String.format(

                "Unable to copy resource %s to %s. No further information provided by underlying filesystem.",

                srcResourceIds.get(i).toPath(), destResourceIds.get(i).toPath()));

      }

    }

  }



  /**

   * Renames a {@link List} of file-like resources from one location to another.

   *

   * <p>The number of source resources must equal the number of destination resources. Destination

   * resources will be created recursively.

   *

   * @param srcResourceIds the references of the source resources

   * @param destResourceIds the references of the destination resources

   * @throws FileNotFoundException if the source resources are missing. When rename throws, the

   *     state of the resources is unknown but safe: for every (source, destination) pair of

   *     resources, the following are possible: a) source exists, b) destination exists, c) source

   *     and destination both exist. Thus no data is lost, however, duplicated resource are

   *     possible. In such scenarios, callers can use {@code match()} to determine the state of the

   *     resource.

   * @throws FileAlreadyExistsException if a target resource already exists and couldn't be

   *     overwritten.

   * @throws IOException if the underlying filesystem indicates the rename was not performed but no

   *     other errors were thrown.

   */

  @Override

  protected void rename(

      List<HadoopResourceId> srcResourceIds, List<HadoopResourceId> destResourceIds)

      throws IOException {

    for (int i = 0; i < srcResourceIds.size(); ++i) {



      Path src = srcResourceIds.get(i).toPath();

      Path dest = destResourceIds.get(i).toPath();



      // rename in HDFS requires the target directory to exist or silently fails (BEAM-4861)

      mkdirs(dest);



      boolean success = fileSystem.rename(src, dest);



      // If the failure was due to the file already existing, delete and retry (BEAM-5036).

      // This should be the exceptional case, so handle here rather than incur the overhead of

      // testing first

      if (!success && fileSystem.exists(src) && fileSystem.exists(dest)) {

        LOG.debug(

            String.format(LOG_DELETING_EXISTING_FILE, Path.getPathWithoutSchemeAndAuthority(dest)));

        fileSystem.delete(dest, false); // not recursive

        success = fileSystem.rename(src, dest);

      }



      if (!success) {

        if (!fileSystem.exists(src)) {

          throw new FileNotFoundException(

              String.format("Unable to rename resource %s to %s as source not found.", src, dest));



        } else if (fileSystem.exists(dest)) {

          throw new FileAlreadyExistsException(

              String.format(

                  "Unable to rename resource %s to %s as destination already exists and couldn't be deleted.",

                  src, dest));



        } else {

          throw new IOException(

              String.format(

                  "Unable to rename resource %s to %s. No further information provided by underlying filesystem.",

                  src, dest));

        }

      }

    }

  }



  /** Ensures that the target directory exists for the given filePath. */

  private void mkdirs(Path filePath) throws IOException {

    Path targetDirectory = filePath.getParent();

    if (!fileSystem.exists(targetDirectory)) {

      LOG.debug(

          String.format(

              LOG_CREATE_DIRECTORY, Path.getPathWithoutSchemeAndAuthority(targetDirectory)));

      boolean success = fileSystem.mkdirs(targetDirectory);

      if (!success) {

        throw new IOException(

            String.format(

                "Unable to create target directory %s. No further information provided by underlying filesystem.",

                targetDirectory));

      }

    }

  }



  @Override

  protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException {

    for (HadoopResourceId resourceId : resourceIds) {

      // ignore response as issues are surfaced with exception

      fileSystem.delete(resourceId.toPath(), false);

    }

  }



  @Override

  protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {

    if (singleResourceSpec.endsWith("/") && !isDirectory) {

      throw new IllegalArgumentException(

          String.format("Expected file path but received directory path %s", singleResourceSpec));

    }

    return !singleResourceSpec.endsWith("/") && isDirectory

        ? new HadoopResourceId(dropEmptyAuthority(singleResourceSpec + "/"))

        : new HadoopResourceId(dropEmptyAuthority(singleResourceSpec));

  }



  @Override

  protected String getScheme() {

    return fileSystem.getScheme();

  }



  /** An adapter around {@link FSDataInputStream} that implements {@link SeekableByteChannel}. */

  private static class HadoopSeekableByteChannel implements SeekableByteChannel {

    private final FileStatus fileStatus;

    private final FSDataInputStream inputStream;

    private boolean closed;



    private HadoopSeekableByteChannel(FileStatus fileStatus, FSDataInputStream inputStream) {

      this.fileStatus = fileStatus;

      this.inputStream = inputStream;

      this.closed = false;

    }



    @Override

    public int read(ByteBuffer dst) throws IOException {

      if (closed) {

        throw new IOException("Channel is closed");

      }

      // O length read must be supported

      int read = 0;

      // We avoid using the ByteBuffer based read for Hadoop because some FSDataInputStream

      // implementations are not ByteBufferReadable,

      // See https://issues.apache.org/jira/browse/HADOOP-14603

      if (dst.hasArray()) {

        // does the same as inputStream.read(dst):

        // stores up to dst.remaining() bytes into dst.array() starting at dst.position().

        // But dst can have an offset with its backing array hence the + dst.arrayOffset()

        read = inputStream.read(dst.array(), dst.position() + dst.arrayOffset(), dst.remaining());

      } else {

        // TODO: Add support for off heap ByteBuffers in case the underlying FSDataInputStream

        // does not support reading from a ByteBuffer.

        read = inputStream.read(dst);

      }

      if (read > 0) {

        dst.position(dst.position() + read);

      }

      return read;

    }



    @Override

    public int write(ByteBuffer src) {

      throw new UnsupportedOperationException();

    }



    @Override

    public long position() throws IOException {

      if (closed) {

        throw new IOException("Channel is closed");

      }

      return inputStream.getPos();

    }



    @Override

    public SeekableByteChannel position(long newPosition) throws IOException {

      if (closed) {

        throw new IOException("Channel is closed");

      }

      inputStream.seek(newPosition);

      return this;

    }



    @Override

    public long size() throws IOException {

      if (closed) {

        throw new IOException("Channel is closed");

      }

      return fileStatus.getLen();

    }



    @Override

    public SeekableByteChannel truncate(long size) {

      throw new UnsupportedOperationException();

    }



    @Override

    public boolean isOpen() {

      return !closed;

    }



    @Override

    public void close() throws IOException {

      closed = true;

      inputStream.close();

    }

  }



  private static URI dropEmptyAuthority(String uriStr) {

    URI uri = URI.create(uriStr);

    String prefix = uri.getScheme() + ":///";

    if (uriStr.startsWith(prefix)) {

      return URI.create(uri.getScheme() + ":/" + uriStr.substring(prefix.length()));

    } else {

      return uri;

    }

  }

}